Skip to main content

Hadoop 基础知识

Hadoop

MapReduce logical data flow

数据处理分为两个阶段:Map和Reduce

Map阶段会对数据进行清洗,过滤以及解析。

然后经过mapreduce进行加工,再传给reduce function

最后得到结果

MapReduce Job

一个Job 包含输入数据,配置,以及MapReduce Program

Hadoop负责把job拆分成一系列Tasks,这些task只有两种类型:Map和Reduce

这些task被YARN调度,在节点上运行。如果Task失败了,它会自动被安排在另一个节点上运行。

(注:这个Yarn和nodejs的yarn容易发生命令行冲突,因此也可以用yarnpkg来调用)

Splits

Hadoop会把数据纵向拆开,分给不同的task去做map,这样可以提升效率。

对大多数Job来说,Split size就是HDFS Block的大小,128MB

如果大于128MB,就有可能会出现结果存在多个Block中,可能会存在多个节点里,出现网络请求。

Map Task的结果是写在本地磁盘,不是写在HDFS里的,因为它属于中间结果

Data Flow

MapReduce data flow with a single reduce task

虚线代表本地数据流,实线代表跨Node数据流。

Multiple Tasks(shuffle)

MapReduce data flow with multiple reduce tasks

Combiner function

除了Mapper和Reducer,还可以根据实际情况选择Combiner 来减少数据的交换。

比如求最大值,就可以用Combiner,在每个Map结束后直接调用,最后的结果不变,但参与网络交换的数据大大减少了。

max(0, 20, 10, 25, 15) = max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25

当然,不是所有问题都能够用得到。

Hadoop Streaming

理论上,只要是能够读取/写入 Unix标准IO的都能够使用Hadoop Streaming,所以不一定要用Java。

Hadoop Distributed Filesystem (HDFS)

Hadoop其实也可以和其他文件系统结合,比如Amazon S3或者本地文件系统。

HDFS的特点

支持超大文件、民用硬件、流式文件存取

HDFS的缺点

延迟(挖坑 HBase)、大量小文件

Append-only,不支持随机写入,不支持Multi writer

Blocks

HDFS的文件都被Chunk成Block进行存储,每个Block 128MB。当然,不像磁盘的4K 块大小,小于128M的文件在HDFS里并不会占有128MB,而是文件原来的大小。

之所以HDFS的块那么大,就是为了最小化seek time,也就是寻找块的时间。

Namenodes and Datanodes

Namenode: master

负责管理文件系统的namespace,filesystem tree,metadata,directories。

存储在本地,namespace image和edit log

并且,在系统启动时,还会通过datanode建立一个文件块和datanode的位置映射,但是是非持久化的。

Namenode如果挂了,数据就全丢失了。因为没有办法知道如何从块重建文件。

第一种办法,就是备份Namenode的state,写到本地磁盘或者nfs。

第二种办法,就是另外运行一个node来定时merge namespace image和edit log。

Datanode: workers

存取块,接受namenode或者client的请求。

Block Caching

指定一些经常使用的Block,放入内存中作为Cache

Memory

1,000 MB per million blocks of storage

a 200-node cluster with 24 TB of disk space per node, a block size of 128 MB, and a replication factor of 3

>>> 200* 24000000 / 128 / 3 / 1000000 * 1000
12500.0
Federation

HDFS Federation, 2.x版本开始, 多个Namenode分别管理集群中的一部分

比如Node 0管理/user,Node 1 管理/share

HDFS HA

NFS或者QJM,来让一对热备的Namenode能够同时访问到edit log,然后能够接管

QJM就是一个小型的HDFS,提供高可用edit log

Failover

ZooKeeper确定只有一个Namenode是活动的

Fencing

假设原有的Node依然在运行,但由于网络原因被认为失效了,触发了Failover,需要一些机制来确保这个Node不造成太多的伤害。这个过程被称为Fencing

QJM只允许同时有一个Namenode写edit log,但是NFS没办法做到

STONITH, or “shoot the other node in the head,”

直接控制电源让另一个host关机

简单命令

From Local

hadoop fs -copyFromLocal input/docs/quangle.txt /user/tom/quangle.txt

To Local

hadoop fs -copyToLocal quangle.txt quangle.copy.txt

Data Flow

Client Access Mode

Accessing HDFS over HTTP directly and via a bank of HDFS proxies

File Write

A client writing data to HDFS

NameNode会先去check 文件是否存在,是否有权限等。如果check通过了,会增加一条new file的record。

dfs.namenode.replication.min replicas (which defaults to 1) are written, 就会success

asynchronously replicated across the cluster, dfs.replication, which defaults to 3

File Read

A client reading data from HDFS

Distances
  • distance(/d1/r1/n1, /d1/r1/n1) = 0 (processes on the same node)
  • distance(/d1/r1/n1, /d1/r1/n2) = 2 (different nodes on the same rack)
  • distance(/d1/r1/n1, /d1/r2/n3) = 4 (nodes on different racks in the same data center)
  • distance(/d1/r1/n1, /d2/r3/n4) = 6 (nodes in different data centers)
Replica Strategy

A typical replica pipeline

同Node,Node On other rack, other node on that rack

Flush

正在写的Block,很可能还是读取不了的状态,需要达到一个Block的大小后缓存才会被Flush。当然也有API可以Force Flush

Parallel Copying

hadoop distcp file1 file2

distcp本身是一个MapReduce Job,每个file都被一个map copy。

YARN (Yet Another Resource Negotiator)

Yarn 是Hadoop的集群资源管理系统。

YARN applications

提供一个API来访问集群的资源,通常是被更高级的API调用的。

How YARN runs an application

这个Container可以是Unix Process,也可以是Linux的cgroup

Resource Requests

Yarn Application可以在运行前就申请好所有资源,或者在运行时动态地申请更多的资源。

Spark就是使用的前一种方式

Application Lifespan

App per job

App per workflow/session of jobs

Long-running

Spark采用的第二种

Mapreduce 1 and Yarn
MapReduce 1YARN
JobtrackerResource manager, application master, timeline server
TasktrackerNode manager
SlotContainer
4000 nodes and 40000 tasks10000 nodes and 100000 tasks
Scheduler Options

FIFO、Capacity、Fair

Cluster utilization over time when running a large job and a small job under the FIFO Scheduler (i), Capacity Scheduler (ii), and Fair Scheduler (iii)

Capacity scheduler的资源是在配置里定义的

Fair Scheduler:用户A 和用户B 如下图

Fair sharing between user queues

资源在用户之间也是Fair shared。

Dominant Resource Fairness

这个很有意思,是针对任务使用异构资源的算法。

假如集群有100CPU 10T 内存,A任务需求 2CPU 300G内存,B任务需求6CPU 100G内存

它是按照资源需求占集群总资源的百分比算的,A占2% 3%,B占6% 1%

所以B的Container数量会比A少一半,来达到平衡

默认DRF是关闭的。

Data Integrity

HDFS默认会在数据写入后计算checksum,然后在读取时验证。

默认每512byte就进行一次CRC-32C checksum,存一个4byte的哈希值。

Datanode 负责在数据写入之前验证checksum,然后在读取时也是一样。每个Datanode都有一个persistent log来记录verification,记录最后一次每个Block验证的时间。Client验证Block后,会回传结果给Datanode更新log。

除了在读写操作以外,DataBlockScanner还会在后台定期检查验证blocks。

当发现出错后,由于HDFS有多个备份,就可以把一个好的备份复制过来。

Compression
Compression formatToolAlgorithmFilename extensionSplittable?
DEFLATE[a]N/ADEFLATE.deflateNo
gzipgzipDEFLATE.gzNo
bzip2bzip2bzip2.bz2Yes
LZOlzopLZO.lzoNo[b]
LZ4N/ALZ4.lz4No
SnappyN/ASnappy.snappyNo
Splitting

如果压缩文件大于128MB,那么就会被分成多个Block。然而,不是所有格式都支持任意区域开始解压缩。比如gzip,就必须同一个Map完成连续多个Block的读取来完成解压。

Bz2,LZO都是可以分块的,这样可以分布式地处理

SequenceFile

The internal structure of a sequence file with no compression and with record compression

The internal structure of a sequence file with block compression

Run a MapReduce Job

How Hadoop runs a MapReduce job

Streaming

The relationship of the Streaming executable to the node manager and the task container

Shuffle and Sort

Shuffle and sort in MapReduce

Map Side

每个Map都有一个memory buffer,默认是100MB。

Map会将输出写到这个buffer里,每当buffer写满后,就会触发spill,创建一个磁盘上的spill文件然后清空buffer。这个buffer是分区(partitioned)的,对应下游的reducer。然后对每个partition,后台会有一个进程对其中的key进行sort,然后再对sort运行一边combiner。

多个Spill文件会被合成为一个分块的有序文件。如果合成前spill 文件大于3个,combiner会再次运行,否则不会运行。在此过程中,数据也会被压缩。

Reduce Side

当Map 任务完成后,会向Application Master发送消息,然后就知道了Map output和host的关系。只有当Application Master向map host发送消息要求删除结果后,结果才会被删除。

Copy phase:从map output复制结果,后台有多个copier threads,默认是5个。

当copies被复制到磁盘上,就会开始merge。这个过程由merge factor控制,如果默认值10,收到了50个结果,就要merge 5次后形成5个文件。最后这5个文件不会被merge,而是直接送到reducer

Efficiently merging 40 file segments with a merge factor of 10

如果是40个文件:第一次4个,然后10、10、10 这四次产生的4个文件和最后剩下的6个合并成为一次merge。

这样做是为了最小化io次数。

Speculative Execution

如果hadoop在运行过程中发现哪个task比预期执行地慢,就会启动一个同样内容的backup 任务。谁先完成,另一个都会被kill掉。这部分只会占很小一部分,显著慢于其他任务时才会触发。

Filesystem Image and Edit log

当客户端发起写请求,这个操作会被记录在edit log中。namenode在内存中也会有一个metadata的数据结构,在edit log修改后更新。这个内存中的metadata会用来响应读请求。

Edit log事实上是很多文件,如edits_inprogress_0000000000000000020 后缀是transaction ID,前缀是edit。

每次只有一个文件打开可以写入,在事务完成后flush,sync。

fsimage是 metadata的完全checkpoint,通常很大,GB级别。内容包含文件系统中所有文件夹和文件的inode,代表metadata。fsimage不包含block在哪个datanode上存储。

为了不让Edit log过多,只能另起一个namenode,定时地取最新的fsimage和edit log,然后合并成最新的fsimage回传给namenode,这样edit log就不会太多。

The checkpointing process

Safe mode

只对client提供文件系统的Read-only view

Balancer

把空间使用率最高的node上一部分block移动到最低的node

start-balancer.sh

安装

brew install hadoop

注意安装的位置

本书的代码可以在github上找到,repo owner就是书的作者

tomwhite/hadoop-book: Example source code accompanying O'Reilly's "Hadoop: The Definitive Guide" by Tom White (github.com)

hadoop-book git:(master) ✗ hadoop jar /opt/homebrew/Cellar/hadoop/3.3.3/libexec/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02-mr-intro/src/main/python/max_temperature_map.py \
-reducer ch02-mr-intro/src/main/python/max_temperature_reduce.py

Installing Hadoop on a Mac. The only guide you will ever need! | by Siphu Langeni, MS | Towards Data Science

$ open core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
$ open hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
$ open mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.application.classpath</name> <value>/opt/homebrew/Cellar/hadoop/3.3.3/libexec/share/hadoop/mapreduce/*:/opt/homebrew/Cellar/hadoop/3.3.3/libexec/share/hadoop/mapreduce/lib/*</value>
</property>
</configuration>
open yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.env-whitelist</name> <value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>
</configuration>

Ssh 本地可以跑通,在设置里打开权限

相关项目(深坑)

Avro

数据序列化系统

Parquet

列存储格式

Flume

数据Ingestion

Sqoop

从关系型数据库提取数据

Pig

处理大型数据集

Hive

Data warehouse

Crunch

Map Reduce高级API

Spark

大数据处理引擎

HBase

Big table

这些有空再更新吧……